﻿#include "clientprocess.h"

#include <data/replypacket.h>
#include <data/requestpacket.h>
#include <data/startargs.h>
#include <data/stopargs.h>

#include <context/context.h>
#include <QsLog.h>

#include <QThread>
#include <QTime>
#include <ratecontrol.h>

/**
 * @brief The Record class
 * 请求记录类。表示一条请求记录。
 */
class  Record
{
public:
    Record(int flags)
    {
        m_flags = flags;
        m_time = new QTime;
        m_elaped = -1.0f;
        m_stopped = false;
    }
    ~Record()
    {
        delete m_time;
    }
    /**
     * @brief start 开始记录。
     */
    void start()
    {
        m_time->start();
    }
    /**
     * @brief stop 停止记录。
     */
    void stop()
    {
        if(!m_stopped)
        {
            m_stopped = true;
            m_elaped = m_time->elapsed();
        }
    }
    /**
     * @brief elapsed 记录的时间。
     * @return
     */
    float elapsed()
    {
        return m_elaped;
    }
private:
    int m_flags; // 请求的参数。
    QTime *m_time; // 时间。
    float m_elaped;
    bool m_stopped; // 停止。
};

ClientProcess::ClientProcess(Context *context)
{
    m_context = context;
    m_workThread = Q_NULLPTR;

    m_workThread = QThread::create([&]{this->runWorker();});
}

ClientProcess::~ClientProcess()
{
    if(m_workThread && m_workThread->isRunning()
            && m_workThread != QThread::currentThread())
    {
        m_workThread->requestInterruption();
        m_workThread->wait(); // 等待线程释放。
    }
    m_context->remove(this);

    foreach(auto item, m_records)
    {
        delete item;
    }
    m_records.clear();
}

void ClientProcess::onStateChanged(PointerCallback::State state, bool bCmd)
{
    if(state != PointerCallback::Connected)
    {
//        cancel();
        return;
    }

    if(!bCmd) // 完成数据链路的链路建立。
    {
        startRequest();
    }
}

void ClientProcess::onStart(const StartArgs &args)
{
    Q_UNUSED(args) // 客户端不处理
}

void ClientProcess::onStop(const StopArgs &args)
{
    Q_UNUSED(args)
//    cancel();
}

void ClientProcess::onRequest(const RequestPacket &pack)
{
    int flg = pack.flag();
    QByteArray content = pack.content();
    ReplyPacket reply; // 构造响应包。
    reply.setFlag(flg); // 设置响应的回复。
    reply.setContent(content); // 设置内容。
    m_context->reply(reply);
}

void ClientProcess::onReply(const ReplyPacket &pack)
{
    // 接收到响应包，停止计时
     int flag = pack.flag();
     int currentSize = m_records.size();
     if(flag >= currentSize || flag < 0)
     {
         // 打印日志，不是期望的数据包。
         QString msg = QString(QObject::tr("Receive request unexpect reply flag :%1, current record size:%2:%3."))
                 .arg(flag).arg(currentSize).arg(m_records.size());
         QLOG_WARN() << msg;
         return ;
     }
     m_records[flag]->stop();
}

void ClientProcess::onReady(const ReadyArgs &args)
{
    Q_UNUSED(args)
    startRequest();
}

void ClientProcess::startRequest()
{
    m_startCounter.fetchAndAddAcquire(1);
    if(m_startCounter >= 2)
    {
        m_startCounter.store(0); // 恢复。
        m_workThread->start();
    }
}

void ClientProcess::runWorker()
{
    //  性能测试主要逻辑。
    StartArgs *startArgs = m_context->startArgs();

    runRequest(startArgs); // 执行请求。

    workComplete(startArgs);// 统计结果。
}

void ClientProcess::runRequest(const StartArgs *startArgs)
{
    int sampling = startArgs->packFreq();// 每秒的数据包发送次数。
    int totalReqCount = startArgs->totalCount(); // 总请求次数。通过start参数赋值。
    int timeOut = startArgs->timeOut(); // 超时时间。 毫秒。
    int reqSize = startArgs->perPackSize(); // 每包数据长度。
    int warmCount = startArgs->warmPack(); // 热身数据包。

    QLOG_INFO() << QString(QObject::tr("Run request:%1; sampling:%2; requestCount:%3; timeOut(ms):%4, warm packet:%5."))
                   .arg(reqSize)
                   .arg(sampling)
                   .arg(totalReqCount)
                   .arg(timeOut)
                   .arg(warmCount);

    RateControl ctrl(sampling);
    ctrl.start();

    RequestPacket pack;
    QByteArray reqContent(reqSize, char(0x88));
    pack.setContent(reqContent);

    totalReqCount += warmCount; // 总包数 = 热身包 + 请求总包数。
    m_records.reserve(totalReqCount);
    for (int i = 0; i < totalReqCount; i++) {
        pack.setFlag(i);
        if(i >= warmCount)
        {
            m_context->setProcPercent(float(i - warmCount) / float(totalReqCount - warmCount));
        }
        if(!request(ctrl, pack)){
            break;
        }
    }
}

void ClientProcess::workComplete(const StartArgs *startArgs)
{
    int timeout = startArgs->timeOut();
    int warmCount = startArgs->warmPack();
    QThread::msleep(uint(timeout)); // 等待未处理完成的。

    foreach(auto item, m_records) // 所有停止计数。
    {
        item->stop();
    }
    // 执行完毕。 开始计算结果
    StopArgs stopArgs;
    resultCompute(timeout, warmCount, stopArgs);
    m_context->stop(stopArgs);  // 通知外部显示、处理结果。
    if(m_workThread->isInterruptionRequested())
    {
       return;
    }
}

void ClientProcess::resultCompute(int timeOut, int warmCount, StopArgs &args)
{
    int testCnt = m_records.size();
    int timeOutCnt = 0;
    float maxTime = 0;
    float minTime = timeOut;
    float totalElapsed = 0;
    int warmCounter = warmCount;
    foreach(auto item, m_records)
    {
        if(warmCounter > 0) // 跳过热身数据包。
        {
            warmCounter--;
            continue;
        }
        float tmpElapsed = item->elapsed();
        if(tmpElapsed > timeOut)
        {
            timeOutCnt++;
        }else {
            totalElapsed += quint64(tmpElapsed);
            if(maxTime < tmpElapsed)
            {
                maxTime = tmpElapsed;
            }
            if(minTime > tmpElapsed)
            {
                minTime  = tmpElapsed;
            }
        }
    }
    float avergeElaped = totalElapsed/float(testCnt-timeOutCnt); // 超时数据包不计入平均。
    PerfResult result(testCnt - warmCount, timeOutCnt, avergeElaped, maxTime, minTime);
    args.setResult(result);
}

bool ClientProcess::request(RateControl &rateCtrl, RequestPacket &pack)
{
    Record *tmpRcd = new Record(pack.flag());
    m_records.append(tmpRcd);
    tmpRcd->start(); // 计时开始后请求。

    bool req = m_context->request(pack);
    if(!req)
    {
        return false;
    }

    int waitTime = rateCtrl.addSample(1);
    if(m_workThread->isInterruptionRequested())
    {
        return false;
    }
    if(waitTime > 0 )
    {
        QThread::msleep(uint(waitTime));
    }
    return true;
}
